package me.tuyou.common;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 定义一个数据源
 */
public class MyLogGenerator implements SourceFunction<MyLog> {

    private boolean isRunning = true;

    @Override
    public void run(SourceContext<MyLog> sourceContext) throws Exception {
        while (isRunning) {
            int deviceId = RandomUtils.nextInt(0, 5);
            String msg = RandomStringUtils.randomAlphanumeric(10);
            MyLog log = new MyLog(deviceId, msg, new Date(), 1);
            sourceContext.collect(log);
            TimeUnit.MILLISECONDS.sleep(RandomUtils.nextInt(1, 1000));
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
